package org.databandtech.job.sink;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import org.databandtech.job.entity.SaveMeta;

public class MysqlSink implements SinkFunction<String> {

	private static final long serialVersionUID = 1L;
	Connection connection = null;
    PreparedStatement preparedStatement = null;
    SaveMeta saveMeta;
    public MysqlSink(SaveMeta saveMeta) {
		super();
		this.saveMeta = saveMeta;
	}

    @Override
    public void open() throws Exception {
        Class.forName( "com.mysql.jdbc.Driver" );
        connection = DriverManager.getConnection( saveMeta.getUrl(), saveMeta.getUsername(), saveMeta.getPassword() );
        preparedStatement = connection.prepareStatement(saveMeta.getSql());
    }

    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
        if (preparedStatement != null) {
            preparedStatement.close();
        }
    }

    @Override
    public void invoke(String json) throws Exception {
        try {
            preparedStatement.setString( 1, json );
            preparedStatement.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
